package com.bw.gmall.realtime.dws.app;

import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.common.base.BaseApp;
import com.bw.gmall.realtime.common.bean.TrafficHomeDetailPageViewBean;
import com.bw.gmall.realtime.common.constant.Constant;
import com.bw.gmall.realtime.common.function.DorisMapFunction;
import com.bw.gmall.realtime.common.util.DateFormatUtil;
import com.bw.gmall.realtime.common.util.FlinkSinkUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Iterator;
// 通过时间窗口聚合首页和商品详情页的浏览数据

public class DwsTrafficHomeDetailPageViewWindow extends BaseApp {
    public static void main(String[] args) {
        new DwsTrafficHomeDetailPageViewWindow().start( Constant.TOPIC_DWD_TRAFFIC_PAGE,Constant.DWS_TRAFFIC_HOME_DETAIL_PAGE_VIEW_WINDOW,4,10023);

    }
    @Override
    public void handle(StreamExecutionEnvironment env, DataStreamSource<String> dataStreamSource) {
        //1.ETL数据
        SingleOutputStreamOperator<JSONObject> etlStream = dataStreamSource.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String s, Collector<JSONObject> collector) throws Exception {
                JSONObject jsonObject = JSONObject.parseObject(s);
                String mid = jsonObject.getJSONObject("common").getString("mid");
                String ts = jsonObject.getString("ts");
                String pageId = jsonObject.getJSONObject("page").getString("page_id");
                if ("home".equals(pageId) || "good_detail".equals(pageId)) {
                    if (mid != null && ts != null) {
                        collector.collect(jsonObject);
                    }
                }
            }
        });
        //2、根据MID分组，
        SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> processStream = etlStream.keyBy(new KeySelector<JSONObject, String>() {
            @Override
            public String getKey(JSONObject jsonObject) throws Exception {
                return jsonObject.getJSONObject("common").getString("mid");
            }
        }).process(new KeyedProcessFunction<String, JSONObject, TrafficHomeDetailPageViewBean>() {
            private ValueState<String> homeState;
            private ValueState<String> detailState;

            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<String> homeStateDesc = new ValueStateDescriptor<>("home_state", String.class);
                ValueStateDescriptor<String> detailStateDesc = new ValueStateDescriptor<>("detail_state", String.class);
                homeStateDesc.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1L)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
                detailStateDesc.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1L)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
                homeState = getRuntimeContext().getState(homeStateDesc);
                detailState = getRuntimeContext().getState(detailStateDesc);
            }

            @Override
            public void processElement(JSONObject jsonObject, KeyedProcessFunction<String, JSONObject, TrafficHomeDetailPageViewBean>.Context context, Collector<TrafficHomeDetailPageViewBean> collector) throws Exception {
                // 处理逻辑
                String homeValue = homeState.value();
                String detailValue = detailState.value();
                String curDt = DateFormatUtil.tsToDate(jsonObject.getLong("ts"));
                String pageId = jsonObject.getJSONObject("page").getString("page_id");
                long homeCt = 0L;
                long detailCt = 0L;
                if ("home".equals(pageId)) {
                    if (!curDt.equals(homeValue)) {
                        homeCt = 1L;
                        homeState.update(curDt);
                    }
                } else {
                    if (!curDt.equals(detailValue)) {
                        detailCt = 1L;
                        detailState.update(curDt);
                    }
                }
                if (homeCt != 0 || detailCt != 0) {
                    collector.collect(TrafficHomeDetailPageViewBean.builder()
                            .homeUvCt(homeCt)
                            .goodDetailUvCt(detailCt)
                            .ts(jsonObject.getLong("ts"))
                            .build());
                }
            }
        });
        //3、添加水位求HomeUV和DetailUV
        SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> reduceStream =
                processStream.assignTimestampsAndWatermarks(WatermarkStrategy.<TrafficHomeDetailPageViewBean>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<TrafficHomeDetailPageViewBean>() {
            @Override
            public long extractTimestamp(TrafficHomeDetailPageViewBean trafficHomeDetailPageViewBean, long l) {
                return trafficHomeDetailPageViewBean.getTs();
            }
        })).windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))).reduce(new ReduceFunction<TrafficHomeDetailPageViewBean>() {
            @Override
            public TrafficHomeDetailPageViewBean reduce(TrafficHomeDetailPageViewBean t1, TrafficHomeDetailPageViewBean t2) throws Exception {
                t1.setHomeUvCt(t1.getHomeUvCt() + t2.getHomeUvCt());
                t1.setGoodDetailUvCt(t1.getGoodDetailUvCt() + t2.getGoodDetailUvCt());
                return t1;
            }
        }, new ProcessAllWindowFunction<TrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow>() {
            @Override
            public void process(ProcessAllWindowFunction<TrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow>.Context context, Iterable<TrafficHomeDetailPageViewBean> iterable, Collector<TrafficHomeDetailPageViewBean> collector) throws Exception {
                TimeWindow window = context.window();
                String start = DateFormatUtil.tsToDate(window.getStart());
                String end = DateFormatUtil.tsToDate(window.getEnd());
                String curDate = DateFormatUtil.tsToDate(System.currentTimeMillis());
                Iterator<TrafficHomeDetailPageViewBean> iterator = iterable.iterator();
                while (iterator.hasNext()) {
                    TrafficHomeDetailPageViewBean trafficHomeDetailPageViewBean = iterator.next();
                    trafficHomeDetailPageViewBean.setStt(start);
                    trafficHomeDetailPageViewBean.setEdt(end);
                    trafficHomeDetailPageViewBean.setCurDate(curDate);
                    collector.collect(trafficHomeDetailPageViewBean);
                }
            }
        });
        //4、写入Doris
        // 必须开启CK
        reduceStream.map(new DorisMapFunction<>()).sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_TRAFFIC_HOME_DETAIL_PAGE_VIEW_WINDOW));
    }
}

